package org.zjvis.datascience.service.dataprovider;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.zjvis.datascience.common.config.DatasourceConfig;
import org.zjvis.datascience.common.constant.DatabaseConstant;
import org.zjvis.datascience.common.dto.DatabaseDTO;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.model.AggrConfig;
import org.zjvis.datascience.common.model.AggregateColumn;
import org.zjvis.datascience.common.model.AggregateResult;
import org.zjvis.datascience.common.model.Column;
import org.zjvis.datascience.common.model.ConfigComponent;
import org.zjvis.datascience.common.model.DimensionConfig;
import org.zjvis.datascience.common.model.MeasureConfig;
import org.zjvis.datascience.common.model.Table;
import org.zjvis.datascience.common.sql.SqlHelper;
import org.zjvis.datascience.common.util.DataUtil;
import org.zjvis.datascience.common.util.HeadUtil;
import org.zjvis.datascience.common.util.SqlUtil;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.common.vo.dataset.DatasetQueryVO;
import org.zjvis.datascience.service.DatabaseService;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;

/**
 * @description Mysql数据provider 提供数据库连接池
 * @date 2021-12-22
 */
@Service("jdbcDataProvider")
public class JdbcDataProvider extends DataProvider implements Aggregator {

    protected final static Logger logger = LoggerFactory.getLogger(JdbcDataProvider.class);

    @Autowired
    private DatabaseService databaseService;

    private static final String NaN = "#NULL";

    public DataSource getSource(DatabaseDTO database, boolean pooled) throws Exception {
        Map<String, String> conf = Maps.newHashMap();

        conf.put(DruidDataSourceFactory.PROP_DRIVERCLASSNAME,
                DatasourceConfig.Driver.getDriver(database.getType()));
        String url = database.schema();
        JSONObject jsonObj = JSONObject.parseObject(database.getConfigJson());
        conf.put(DruidDataSourceFactory.PROP_USERNAME, jsonObj.getString("username"));
        conf.put(DruidDataSourceFactory.PROP_PASSWORD, jsonObj.getString("password"));
        conf.put(DruidDataSourceFactory.PROP_URL, url);

        if (pooled) {
            conf.put(DruidDataSourceFactory.PROP_MAXACTIVE, String.valueOf(DS_CONN_MAX_ACTIVE));
            conf.put(DruidDataSourceFactory.PROP_INITIALSIZE, String.valueOf(DS_CONN_INIT_ACTIVE));
            conf.put(DruidDataSourceFactory.PROP_MINIDLE, String.valueOf(DS_CONN_MIN_IDLE));
            logger.info("[Init GP connection pool size] -> {}", DS_CONN_INIT_ACTIVE);
        } else {
            conf.put(DruidDataSourceFactory.PROP_MAXACTIVE, "1");
            conf.put(DruidDataSourceFactory.PROP_INITIALSIZE, "1");
        }

        DruidDataSource druidDS = (DruidDataSource) DruidDataSourceFactory.createDataSource(conf);
        druidDS.setMaxWait(DS_CONN_MAX_WAIT);
        druidDS.setKeepAlive(true);
        druidDS.setBreakAfterAcquireFailure(true);
        druidDS.setConnectionErrorRetryAttempts(DS_CONN_RETRY_NUM);
        druidDS.setRemoveAbandoned(true);
        druidDS.setRemoveAbandonedTimeout(60 * 5);
        druidDS.setLogAbandoned(true);
        druidDS.setTestWhileIdle(true);
        druidDS.setValidationQuery("select 1");

        return druidDS;
    }

    public DataSource getSource(Long dsId) {
        return getSource(dsId, true);
    }

    public DataSource getSource(Long dsId, boolean pooled) {
        return dsMap.computeIfAbsent(dsId, (id) -> {
            DatabaseDTO database = databaseService.queryById(id);
            try {
                return getSource(database, pooled);
            } catch (Exception e) {
                logger.error("get jdbc source error", e);
                throw new DataScienceException(
                        String.format("get jdbc source error, since %s", e.getMessage()), e);
            }
        });
    }

    public Connection getConn(Long dsId) throws Exception {
        DataSource ds = null;
        Connection con = null;
        PreparedStatement ps = null;
        ds = getSource(dsId);
        con = ds.getConnection();
        /* 官方的驱动使用当前connection管理时，遇到gp服务器重启，不会自动断线重连。
         * 异常表现为，connection还在同时状态为有效，执行prepareStatement会报错：
         * 1.最开始报，Message 1001 not found.
         * 2.在本检测逻辑中报，terminating connection due to administrator command.
         * con.isValid()无法检测出来。
         * 使用pgsql的驱动倒是可以自动断线重连，但是pgsql不支持bit格式字段，无法替换官方驱动。
         * 所以使用本方法进行校验，成功继续执行下一步，异常移除缓存的connection，重新获取新的connection
         * 防止死循环，最多进行三次校验
         *  */
//        if (DatabaseConstant.GREEN_PLUM_DATASET_ID == dsId.longValue()) {
//            for (int i = 0; i < 3; i++) {
//                if (con != null) {
//                    try {
//                        //简单用一个获取版本的sql进行校验，没有异常结束校验
//                        ps = con.prepareStatement("select version();");
//                        break;
//                    } catch (Exception e) {
//                        logger.error("need get new connection", e);
//                        dsMap.remove(dsId);
//                        ds = getSource(dsId);
//                        con = ds.getConnection();
//                        throw new DataScienceException(
//                                String.format("get jdbc connection error, since %s", e.getMessage()), e);
//                    } finally {
//                        JDBCUtil.close(null, ps, null);
//                    }
//                }
//            }
//        }

        return con;
    }

    @Override
    public List<String> showDatabases(Long dsId) throws DataScienceException {
        List<String> ret = Lists.newArrayList();
        Connection conn = null;
        try {
            conn = getConn(dsId);
            Statement stat = conn.createStatement();
            stat.setQueryTimeout(SQL_QUERY_TIMEOUT_SECOND);
            java.sql.ResultSet rs = stat.executeQuery("show databases");
            while (rs.next()) {
                ret.add(rs.getString(1));
            }
        } catch (Exception e) {
            logger.error("show databases error, dsId=" + dsId, e);
            throw new DataScienceException("dsId=" + dsId + "\n" + e.getMessage(), e);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }
        return ret;
    }

    @Override
    public List<String> showTables(Long dsId, String db) throws DataScienceException {
        List<String> ret = Lists.newArrayList();
        Connection conn = null;
        try {
            conn = getConn(dsId);
            Statement stat = conn.createStatement();
            stat.setQueryTimeout(SQL_QUERY_TIMEOUT_SECOND);
            stat.execute("use " + db);
            java.sql.ResultSet rs = stat.executeQuery("show tables");
            while (rs.next()) {
                ret.add(rs.getString(1));
            }
        } catch (Exception e) {
            //logger.error("show tables error, dsId=" + dsId, e);
            throw new DataScienceException("dsId=" + dsId + "\n" + e.getMessage(), e);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }
        return ret;
    }

    @Override
    public String getSql(Table table, AggrConfig config) throws DataScienceException {
        SqlHelper sqlHelper = new SqlHelper();
        sqlHelper.bind(getColumnTypes(table));
        return sqlHelper.assembleSql(config, table);
    }

    @Override
    public AggregateResult getData(Table table, AggrConfig config) throws DataScienceException {
        List<Object[]> ret = Lists.newArrayList();
        AggregateResult aggregateResult = new AggregateResult();
        String sql = StringUtils.EMPTY;
        try {
             sql = getSql(table, config);
        }catch (DataScienceException e1){
            aggregateResult.setErrorMsg(e1.getApiResult().getMessage());
            return aggregateResult;
        }catch (Exception e2){
            aggregateResult.setErrorMsg(String.format("something wrong when generate sql, since %s", e2.getMessage()));
            return aggregateResult;
        }
        logger.info("[getData] execute SQL -> {}", sql);
        int dimSize = config.getDimensions().size();
        int measureSize = config.getMeasures().size();

        int index = -1;
        Connection conn = null;
        try {
            conn = getConn(table.getDsId());
            Statement stat = conn.createStatement();
            stat.setQueryTimeout(SQL_QUERY_TIMEOUT_SECOND);
            java.sql.ResultSet rs = stat.executeQuery(sql);
            int size = config.getDimensions().size() + config.getMeasures().size();
            while (rs.next()) {
                Object[] record = new Object[size];
                for (int i = 0; i < size; i++) {
                    Object o = rs.getObject(i + 1);
                    if (o == null) {
                        record[i] = null;
                    } else {
                        record[i] = o;
                    }

                }
                ret.add(record);
            }

        } catch (Exception e) {
            logger.error("get " + table + " column value error", e);
            throw new DataScienceException(sql + "\n" + e.getMessage(), e);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }

        List<AggregateColumn> aggrColumns = Lists.newArrayListWithCapacity(dimSize + measureSize);

        for (int i = 0; i < dimSize; i++) {
            AggregateColumn column = new AggregateColumn();
            column.setAggrType(null);
            DimensionConfig dc = config.getDimensions().get(i);
            if (StringUtils.isNotEmpty(dc.getAlias())) {
                column.setName(dc.getAlias());
            } else {
                column.setName(dc.getFieldName());
            }
            column.setIndex(i);
            column.setType(dc.getType());
            aggrColumns.add(column);
        }

        for (int i = 0; i < measureSize; i++) {
            AggregateColumn column = new AggregateColumn();
            MeasureConfig mc = config.getMeasures().get(i);
            column.setAggrType(mc.getAggType());
            column.setName(mc.getAlias());
            column.setIndex(i + dimSize);
            aggrColumns.add(column);
        }

        return aggregateResult.init(aggrColumns, ret.toArray(new Object[ret.size()][]));
    }


    public Map<String, Integer> getColumnTypes(Table table) {
        Map<String, Integer> ret = columnTypeCache.get(table.key());

        if (CollectionUtils.isEmpty(ret)) {
            ret = Maps.newLinkedHashMap();

            String sql = "";
            sql = "SELECT * FROM " + table.getName() + " WHERE 1=0";

            Connection conn = null;
            try {
                conn = getConn(table.getDsId());
                Statement stat = conn.createStatement();
                ResultSetMetaData metaData = stat.executeQuery(sql).getMetaData();
                int columnCount = metaData.getColumnCount();
                for (int i = 0; i < columnCount; i++) {
                    ret.put(metaData.getColumnLabel(i + 1).toUpperCase(),
                            metaData.getColumnType(i + 1));
                }
            } catch (Exception e) {
                logger.error("[getColumnTypes] get " + table + " column type name error, since {} and sql -> {}", e.getMessage(), sql);
                throw new DataScienceException(
                        String.format("get column type error, since %s", e.getMessage()), e);
            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (SQLException e) {
                    }
                }
            }

            columnTypeCache.put(table.key(), ret);
        }

        return ret;
    }

    public Map<String, Integer> getColumnTypesOriginal(Table table) {
        String key = Hashing.md5().newHasher().putString(Joiner.on("|")
                        .join(table.getDsId(), StringUtils.isEmpty(table.getName()) ? "" : table.getName())
                , Charsets.UTF_8).hash().toString();
        Map<String, Integer> ret = columnTypeCache.get(key);

        if (CollectionUtils.isEmpty(ret)) {
            ret = Maps.newLinkedHashMap();

            String sql = "";
            sql = "SELECT * FROM " + table.getName() + " WHERE 1=0";

            Connection conn = null;
            try {
                conn = getConn(table.getDsId());
                Statement stat = conn.createStatement();
                ResultSetMetaData metaData = stat.executeQuery(sql).getMetaData();
                int columnCount = metaData.getColumnCount();
                for (int i = 0; i < columnCount; i++) {
                    ret.put(metaData.getColumnLabel(i + 1), metaData.getColumnType(i + 1));
                }
            } catch (Exception e) {
                logger.error("[getColumnTypesOriginal] get " + table + " column type name error, since {} and sql -> {}", e.getMessage(), sql);
                throw new DataScienceException(
                        String.format("get column type error, since %s", e.getMessage()), e);
            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (SQLException e) {
                    }
                }
            }

            columnTypeCache.put(key, ret);
        }

        return ret;
    }


    @Override
    public List<Column> getColumns(Table table) throws Exception {
        return getColumnsFromSql(table);
    }

    private List<Column> getColumnsFromSql(Table table) throws Exception {
        List<Column> columns = Lists.newArrayList();
        Connection conn = null;
        String sql = null;
        sql = "SELECT * FROM " + table.getName() + " LIMIT 1";
        conn = getConn(table.getDsId());

        try {
            PreparedStatement stat = conn.prepareStatement(sql);
            ResultSetMetaData metaData = stat.executeQuery().getMetaData();
            int columnCount = metaData.getColumnCount();
            for (int i = 0; i < columnCount; i++) {
                int index = i + 1;
                String columnNameType = metaData.getColumnTypeName(index);
                String columnLabel = metaData.getColumnLabel(index);

                System.out.println(
                        columnNameType + ":" + metaData.getColumnType(index) + ":" + metaData
                                .getColumnClassName(index));
                Column column = new Column(columnLabel, columnNameType.toLowerCase(), columnLabel);
                columns.add(column);
            }
        } catch (Exception e) {
            logger.error("get view(" + sql + ") column types error", e);
            throw new DataScienceException(sql + "\n" + e.getMessage(), e);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }
        return columns;
    }

    @Override
    public List<String> getColumnValues(Table table, String column, AggrConfig config)
            throws DataScienceException {
        SqlHelper sqlHelper = new SqlHelper();
        sqlHelper.bind(getColumnTypes(table));
        String sqlQuery = "";
        sqlQuery = String
                .format(template, column, table.getName(), sqlHelper.assembleFilter(config));
        List<String> columnValues = new ArrayList<>();
        Connection conn = null;
        try {
            conn = getConn(table.getDsId());
            Statement stat = conn.createStatement();
            java.sql.ResultSet rs = stat.executeQuery(sqlQuery);
            while (rs.next()) {
                String value = rs.getString(column);
                if (StringUtils.isNotEmpty(value)) {
                    columnValues.add(value);
                }
            }
        } catch (Exception e) {
            logger.error(sqlQuery + " column value error", e);
            throw new DataScienceException(sqlQuery + "\n" + e.getMessage(), e);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }

        return columnValues;
    }

    /**
     * getTableMetaInfo
     *
     * @param table table name
     * @return header of table
     */
    public List<String> getTableMetaInfo(String table) {
        Connection conn = null;
        try {
            conn = getConn(1L);
            Statement st = conn.createStatement();
            String sql = String.format(
                    "select column_name, data_type from information_schema.columns where table_name = '%s'",
                    table);
            ResultSet rs = st.executeQuery(sql);
            List<String> meta = new ArrayList<>();
            while (rs.next()) {
                meta.add(rs.getString("column_name"));
            }
            return meta;
        } catch (Exception e) {
            return null;
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }
    }

    public Map<String, String> getTableMetaMap(String table) {
        return getTableMetaMap(table, "dataset");
    }

    /**
     * getTableMetaInfo
     *
     * @param table table name
     * @return meta info , column_name and column_type
     */
    public Map<String, String> getTableMetaMap(String table, String schema) {
        logger.warn("params send to getTableMetaMap are {} and {} ", table, schema);
        Map<String, String> ret = new LinkedHashMap<>();

        String sql = "";
        sql = "SELECT * FROM " + schema + "." + table + " WHERE 1=0";

        Connection conn = null;
        try {
            conn = getConn(1L);
            Statement stat = conn.createStatement();
            ResultSetMetaData metaData = stat.executeQuery(sql).getMetaData();
            int columnCount = metaData.getColumnCount();
            for (int i = 0; i < columnCount; i++) {
                ret.put(metaData.getColumnName(i + 1), metaData.getColumnTypeName(i + 1));
            }
        } catch (Exception e) {
            logger.error("get " + schema + "." + table + " column type name error, since {} and sql -> {}", e.getMessage(), sql);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }

        return ret;
    }

    public JSONObject getTableMeta(String table) {
        JSONObject ret = new JSONObject();

        String sql = "";
        sql = "SELECT * FROM " + table + " WHERE 1=0";

        Connection conn = null;
        try {
            conn = getConn(1L);
            Statement stat = conn.createStatement();
            ResultSetMetaData metaData = stat.executeQuery(sql).getMetaData();
            int columnCount = metaData.getColumnCount();
            for (int i = 0; i < columnCount; i++) {
                ret.put(metaData.getColumnName(i + 1), metaData.getColumnTypeName(i + 1));
            }
        } catch (Exception e) {
            logger.error("get " + table + " column type name error", e);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }

        return ret;
    }


    public Long getRecordCount(String table) {
        Connection conn = null;
        try {
            conn = getConn(1L);
            Statement st = conn.createStatement();
            String sql = String.format("select count(1) as number from %s", table);
            ResultSet rs = st.executeQuery(sql);
            while (rs.next()) {
                return rs.getLong("number");
            }
        } catch (Exception e) {
            return null;
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {

                }
            }
        }
        return null;
    }

    /**
     * getPreviewData
     *
     * @param table
     * @param head
     * @param limit
     * @return
     */
    public JSONArray getPreviewData(String table, List<String> head, int limit) {
        Connection conn = null;
        JSONArray jsonArray = new JSONArray();
        try {
            conn = getConn(1L);
            Statement st = conn.createStatement();
            String sql = String.format("select * from %s limit %s", table, limit);
            if (limit < 0) {
                sql = String.format("select * from %s", table);
            }
            ResultSet rs = st.executeQuery(sql);
            while (rs.next()) {
                JSONObject item = new JSONObject();
                for (String column : head) {
                    item.put(column, rs.getString(column));
                }
                jsonArray.add(item);
            }
        } catch (Exception e) {
            return null;
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {

                }
            }
        }
        return jsonArray;
    }

    public AggregateResult getDataDetail(String table, List<AggregateColumn> head, Integer limit,
                                         JSONObject sortJson, List<ConfigComponent> filters, DatasetQueryVO queryVO) {
        AggregateResult ret = new AggregateResult();
        Connection conn = null;
        List<AggregateColumn> acList = head;
        List<Object[]> data = null;

        String filterSql = StringUtils.EMPTY;
        if (!CollectionUtils.isEmpty(filters)) {
            SqlHelper sqlHelper = new SqlHelper();
            Map<String, Integer> columnTypes = getColumnTypes(new Table(1L, table));
            sqlHelper.bind(columnTypes);
            try {
                filterSql = sqlHelper.assembleFilter(filters);
            }catch (DataScienceException e1){
                ret.setErrorMsg(e1.getApiResult().getMessage());
                return ret;
            }catch (Exception e){
                ret.setErrorMsg("something wrong happened when build filter condition.");
                return ret;
            }
        }

        try {
            conn = getConn(1L);
            Statement st = conn.createStatement();

            boolean flag = true;
            String sql = "";
            List<String> newSortList = new ArrayList<>();
            List<String> columnBody = new ArrayList<>();

            for (Map.Entry entry : sortJson.entrySet()) {
                String col = (String) entry.getKey();
                String sort = (String) entry.getValue();
                try {
                    JSONArray order = JSONArray.parseArray(sort);
                    flag = false;
                    String newCol = SqlUtil.formatPGSqlColName(col + "_after");
                    col = SqlUtil.formatPGSqlColName(col);
                    newSortList.add(newCol);
                    String orderSql = "";
                    for (int i = 0; i < order.size(); i++) {
                        String val = order.getString(i);
                        orderSql += String.format("when %s in ('%s') then %s ", col, val, i);
                    }
                    columnBody.add(String.format("case %s end as %s", orderSql, newCol));
                } catch (Exception e) {
                    newSortList.add(String.format("\"%s\" %s", col, sort));
                }
            }

            if (flag) {
                sql = String.format("select * from %s", table);
                if (StringUtils.isNotEmpty(filterSql)) {
                    sql += " " + filterSql;
                }
                if (null != queryVO) {
                    sql += queryVO.toSortSuffix();
                }else {
                    if (!CollectionUtils.isEmpty(newSortList)) {
                        sql += " order by " + Joiner.on(",").join(newSortList);
                    }
                }
                if (limit != null) {
                    sql += " limit " + limit;
                }
                if (null != queryVO) {
                    ResultSet rs = null;
                    if (StringUtils.isNotEmpty(filterSql)) {
                        rs = st.executeQuery(String.format(DatabaseConstant.GP_COUNT_SQL, table + " " + filterSql));
                    }else {
                        rs = st.executeQuery(String.format(DatabaseConstant.GP_COUNT_SQL, table));
                    }
                    if (rs.next()) {
                        queryVO.setTotalRows(rs.getInt(1));
                    }
                }
            } else {
                String tmpTable = table + "_tmp";
                sql = String.format("create table %s as select *, %s from %s", tmpTable,
                        Joiner.on(",").join(columnBody), table);
                st.execute(sql);
                String[] split = tmpTable.split("\\.");
                Map<String, String> tableMetaMap = getTableMetaMap(split[1], split[0]);
                List<String> cols = Lists.newLinkedList(tableMetaMap.keySet());
                for (String col : newSortList) {
                    cols.remove(col);
                }
                sql = String.format("select %s from %s order by %s;", Joiner.on(",").join(SqlUtil.formatPGSqlCols(cols)), tmpTable, Joiner.on(",").join(newSortList));
                sql += String.format("drop table if exists %s", tmpTable);
            }
            logger.warn("[getDataDetail] execute sql -> {}", sql);
            ResultSet rs = st.executeQuery(sql);
            acList = HeadUtil.wrapAggHead(rs.getMetaData());
            data = DataUtil.wrapDataFromAggHead(rs, acList);

        } catch (Exception e) {
            logger.error("getDataDetail Failed since, {}", e.getMessage());
            return null;
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }

        return ret.init(acList, data.toArray(new Object[data.size()][]));
    }

    public AggregateResult getDataDetailTemp(String table, List<AggregateColumn> head,
                                             Integer limit,
                                             JSONObject sortJson, List<ConfigComponent> filters, JSONObject jsonObj) {
        JSONObject widgetJson = jsonObj.getJSONObject("widgetJson");
        JSONObject config = widgetJson.getJSONObject("config");
        JSONArray values = config.getJSONArray("values");
        int clusterNum = 0;
        String cols = "";
        List<String> list = Lists.newArrayList();
        for (int i = 0; i < values.size(); i++) {
            list.add(SqlUtil.formatPGSqlColName(values.getJSONObject(i).getString("col")));
        }
        list.add("\"cluster_id\"");
        cols = Joiner.on(",").join(list);
        Connection conn = null;
        List<AggregateColumn> acList = head;
        List<Object[]> data = Lists.newArrayList();

        String sqlTemplate = "select %s from (select * from %s %s where cluster_id = '%d' limit %d) _%d";

        List<String> sortList = new ArrayList<>();
        for (Map.Entry entry : sortJson.entrySet()) {
            sortList.add(String.format("\"%s\" %s", entry.getKey(), entry.getValue()));
        }

        try {
            conn = getConn(1L);
            Statement st = conn.createStatement();
            String sql = "";
            String orderBy = "";
            if (!CollectionUtils.isEmpty(sortList)) {
                orderBy = "order by " + Joiner.on(",").join(sortList);
            }

            ResultSet rs = st.executeQuery("select max(\"cluster_id\") from " + table);
            if (rs.next()) {
                clusterNum = rs.getInt(1);
            }

            List<String> sqlList = Lists.newArrayList();
            for (int i = 0; i <= clusterNum; i++) {
                sqlList.add(String.format(sqlTemplate, cols, table, orderBy, i, limit, i));
            }
            sql = Joiner.on(" union all ").join(sqlList);
            logger.warn("[getDataDetailTemp] execute sql -> {}", sql);
            rs = st.executeQuery(sql);
            acList = HeadUtil.wrapAggHead(rs.getMetaData());
            data = DataUtil.wrapDataFromAggHead(rs, acList);

        } catch (Exception e) {
            return null;
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }

        return new AggregateResult(acList, data.toArray(new Object[data.size()][]));
    }

    @Transactional(rollbackFor = Exception.class)
    public void dropRedundantTables(String tables, boolean isView) {
        Connection conn = null;
        try {
            conn = getConn(1L);
            Statement st = conn.createStatement();
            for (String table : tables.split(",")) {
                String sql;
                if (isView && table.contains("solid_tclean_")) {
                    sql = String.format("drop table if exists %s cascade", table);
                } else {
                    sql = String
                            .format("drop %s if exists %s cascade", (isView ? "view" : "table"), table);
                }
                st.executeUpdate(sql);
            }
        } catch (Exception e) {
            logger.error("JdbcDataProvider.dropRedundantTables() drop error,tables={}", tables);
            throw new DataScienceException(
                    String.format("get jdbc connection error, since %s", e.getMessage()), e);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {

                }
            }
        }
    }

    public List<String> querySparkTables() {
        List<String> tables = Lists.newArrayList();
        Connection conn = null;
        try {
            conn = getConn(1L);
            Statement st = conn.createStatement();
            String sql = "select CONCAT( 'drop external table if exists ', "
                    + "table_schema, '.', table_name, ';' ) FROM information_schema.tables Where table_name"
                    + " ~ '^spark_[a-zA-Z0-9]+_[a-zA-Z0-9]+_[a-zA-Z0-9]+_[a-zA-Z0-9]+$'";
            ResultSet rs = st.executeQuery(sql);
            while (rs.next()) {
                tables.add(rs.getString(1));
            }
        } catch (Exception e) {
            logger.error("JdbcDataProvider.querySparkTables() error, message={}", e.getMessage());
            return null;
        } finally {
            JDBCUtil.close(conn, null, null);
        }
        return tables;
    }

    public final int SQL_QUERY_TIMEOUT_SECOND = 30;

    @Value("${greenplum.pool.max-wait-time}")
    private int DS_CONN_MAX_WAIT;

    @Value("${greenplum.pool.retry}")
    private int DS_CONN_RETRY_NUM;

    @Value("${greenplum.pool.max-active}")
    private int DS_CONN_MAX_ACTIVE;

    @Value("${greenplum.pool.init-size}")
    private int DS_CONN_INIT_ACTIVE;

    @Value("${greenplum.pool.min-idle}")
    private int DS_CONN_MIN_IDLE;

    private final String template = "SELECT DISTINCT(%s) FROM %s %s";

}
